package com.braska.grave.netty.handler;

import com.alibaba.fastjson.JSON;
import com.braska.grave.netty.manager.NettyChannelManager;
import com.braska.grave.netty.model.Request;
import com.braska.grave.netty.model.Response;
import com.google.common.collect.Lists;
import io.netty.channel.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;

import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

/**
 * @author shenyuhang
 * @date 2020/3/31
 **/
@ChannelHandler.Sharable
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    private static final Logger logger = Logger.getLogger(NettyServerHandler.class.getName());

    private ConcurrentHashMap<String, SynchronousQueue<Object>> queueMap = new ConcurrentHashMap<>();
    private NettyChannelManager manager;
    private String remoteAddress;

    public NettyClientHandler(NettyChannelManager manager, String remoteAddress) {
        this.manager = manager;
        this.remoteAddress = remoteAddress;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        logger.info("已连接到netty服务器." + ctx.channel().remoteAddress());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
        logger.info("与netty服务器断开连接." + address);
        ctx.channel().close();
        manager.remove(ctx.channel());

        final EventLoop eventLoop = ctx.channel().eventLoop();
        eventLoop.schedule(() -> {
            manager.refresh(Lists.newArrayList(remoteAddress));
        }, 1L, TimeUnit.SECONDS);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Response response = JSON.parseObject(msg.toString(), Response.class);
        String requestId = response.getRequestId();
        SynchronousQueue<Object> queue = queueMap.get(requestId);
        queue.put(response);
        queueMap.remove(requestId);
    }

    public SynchronousQueue<Object> sendRequest(Request request, Channel channel) {
        SynchronousQueue<Object> queue = new SynchronousQueue<>();
        queueMap.put(request.getId(), queue);
        channel.writeAndFlush(request);
        return queue;
    }

    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        logger.info("发送心跳消息...");
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.ALL_IDLE) {
                Request request = new Request();
                request.setMethodName("heartBeat");
                ctx.channel().writeAndFlush(request);
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        logger.info("通信服务器发生异常." + cause);
        ctx.channel().close();
    }
}
